"""
Set up the Salt integration test suite
"""

import atexit
import copy
import errno
import logging
import multiprocessing
import os
import pprint
import re
import shutil
import signal
import socket
import stat
import subprocess
import sys
import tempfile
import threading
import time
from datetime import datetime, timedelta

import salt
import salt.config
import salt.log.setup as salt_log_setup
import salt.master
import salt.minion
import salt.output
import salt.runner
import salt.utils.color
import salt.utils.files
import salt.utils.msgpack
import salt.utils.path
import salt.utils.platform
import salt.utils.process
import salt.utils.stringutils
import salt.utils.yaml
import salt.version
from salt.exceptions import SaltClientError
from salt.ext import six
from salt.utils.immutabletypes import freeze
from salt.utils.verify import verify_env
from tests.support.cli_scripts import ScriptPathMixin
from tests.support.helpers import RedirectStdStreams, requires_sshd_server
from tests.support.mixins import (
    AdaptedConfigurationTestCaseMixin,
    CheckShellBinaryNameAndVersionMixin,
    SaltClientTestCaseMixin,
    SaltMinionEventAssertsMixin,
    SaltReturnAssertsMixin,
    ShellCaseCommonTestsMixin,
)
from tests.support.parser import PNUM, SaltTestcaseParser, print_header
from tests.support.paths import *  # pylint: disable=wildcard-import
from tests.support.processes import *  # pylint: disable=wildcard-import
from tests.support.processes import SaltMaster, SaltMinion, SaltSyndic
from tests.support.runtests import RUNTIME_VARS
from tests.support.unit import TestCase

try:
    import pwd
except ImportError:
    pass


try:
    import salt.ext.six.moves.socketserver as socketserver  # pylint: disable=no-name-in-module
except ImportError:
    import socketserver


log = logging.getLogger(__name__)

_RUNTESTS_PORTS = {}


def get_unused_localhost_port():
    """
    Return a random unused port on localhost
    """
    usock = socket.socket(family=socket.AF_INET, type=socket.SOCK_STREAM)
    usock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    usock.bind(("127.0.0.1", 0))
    port = usock.getsockname()[1]
    if port in (54505, 54506, 64505, 64506, 64507, 64508, 64510, 64511, 64520, 64521):
        # These ports are hardcoded in the test configuration
        port = get_unused_localhost_port()
        usock.close()
        return port

    DARWIN = True if sys.platform.startswith("darwin") else False
    BSD = True if "bsd" in sys.platform else False
    AIX = True if sys.platform.startswith("aix") else False

    if (AIX or DARWIN) and port in _RUNTESTS_PORTS:
        port = get_unused_localhost_port()
        usock.close()
        return port

    _RUNTESTS_PORTS[port] = usock

    if DARWIN or BSD or AIX:
        usock.close()

    return port


def close_open_sockets(sockets_dict):
    for port in list(sockets_dict):
        sock = sockets_dict.pop(port)
        sock.close()


atexit.register(close_open_sockets, _RUNTESTS_PORTS)


SALT_LOG_PORT = get_unused_localhost_port()


class ThreadingMixIn(socketserver.ThreadingMixIn):
    daemon_threads = False


class ThreadedSocketServer(ThreadingMixIn, socketserver.TCPServer):

    allow_reuse_address = True

    def server_activate(self):
        self.shutting_down = threading.Event()
        super().server_activate()

    def server_close(self):
        if hasattr(self, "shutting_down"):
            self.shutting_down.set()
        super().server_close()


class SocketServerRequestHandler(socketserver.StreamRequestHandler):
    def handle(self):
        unpacker = salt.utils.msgpack.Unpacker(raw=False)
        while not self.server.shutting_down.is_set():
            try:
                wire_bytes = self.request.recv(1024)
                if not wire_bytes:
                    break
                unpacker.feed(wire_bytes)
                for record_dict in unpacker:
                    record = logging.makeLogRecord(record_dict)
                    logger = logging.getLogger(record.name)
                    logger.handle(record)
                    del record_dict
            except (EOFError, KeyboardInterrupt, SystemExit):
                break
            except OSError as exc:
                try:
                    if exc.errno == errno.WSAECONNRESET:
                        # Connection reset on windows
                        break
                except AttributeError:
                    # We're not on windows
                    pass
                log.exception(exc)
            except Exception as exc:  # pylint: disable=broad-except
                log.exception(exc)


class TestDaemonStartFailed(Exception):
    """
    Simple exception to signal that a test daemon failed to start
    """


class TestDaemon:
    """
    Set up the master and minion daemons, and run related cases
    """

    MINIONS_CONNECT_TIMEOUT = MINIONS_SYNC_TIMEOUT = 600

    def __init__(self, parser):
        self.parser = parser
        self.colors = salt.utils.color.get_colors(
            self.parser.options.no_colors is False
        )
        if salt.utils.platform.is_windows():
            # There's no shell color support on windows...
            for key in self.colors:
                self.colors[key] = ""

    def __enter__(self):
        """
        Start a master and minion
        """
        # Setup the multiprocessing logging queue listener
        salt_log_setup.setup_multiprocessing_logging_listener(self.master_opts)

        # Set up PATH to mockbin
        self._enter_mockbin()

        self.minion_targets = {"minion", "sub_minion"}

        if self.parser.options.transport == "zeromq":
            self.start_zeromq_daemons()
        elif self.parser.options.transport == "tcp":
            self.start_tcp_daemons()

        self.pre_setup_minions()
        self.setup_minions()

        if getattr(self.parser.options, "ssh", False):
            self.prep_ssh()

        self.wait_for_minions(time.time(), self.MINIONS_CONNECT_TIMEOUT)

        if self.parser.options.sysinfo:
            try:
                print_header(
                    "~~~~~~~ Versions Report ",
                    inline=True,
                    width=getattr(self.parser.options, "output_columns", PNUM),
                )
            except TypeError:
                print_header("~~~~~~~ Versions Report ", inline=True)

            print("\n".join(salt.version.versions_report()))

            try:
                print_header(
                    "~~~~~~~ Minion Grains Information ",
                    inline=True,
                    width=getattr(self.parser.options, "output_columns", PNUM),
                )
            except TypeError:
                print_header("~~~~~~~ Minion Grains Information ", inline=True)

            grains = self.client.cmd("minion", "grains.items")

            minion_opts = self.minion_opts.copy()
            minion_opts["color"] = self.parser.options.no_colors is False
            salt.output.display_output(grains, "grains", minion_opts)

        try:
            print_header(
                "=",
                sep="=",
                inline=True,
                width=getattr(self.parser.options, "output_columns", PNUM),
            )
        except TypeError:
            print_header("", sep="=", inline=True)

        try:
            return self
        finally:
            self.post_setup_minions()

    def start_zeromq_daemons(self):
        """
        Fire up the daemons used for zeromq tests
        """
        self.log_server = ThreadedSocketServer(
            ("localhost", SALT_LOG_PORT), SocketServerRequestHandler
        )
        self.log_server_process = threading.Thread(target=self.log_server.serve_forever)
        self.log_server_process.start()
        try:
            sys.stdout.write(
                " * {LIGHT_YELLOW}Starting salt-master ... {ENDC}".format(**self.colors)
            )
            sys.stdout.flush()
            self.master_process = start_daemon(
                daemon_name="salt-master",
                daemon_id=self.master_opts["id"],
                daemon_log_prefix="salt-master/{}".format(self.master_opts["id"]),
                daemon_cli_script_name="master",
                daemon_config=self.master_opts,
                daemon_config_dir=RUNTIME_VARS.TMP_CONF_DIR,
                daemon_class=SaltMaster,
                bin_dir_path=SCRIPT_DIR,
                fail_hard=True,
                event_listener_config_dir=RUNTIME_VARS.TMP_CONF_DIR,
                start_timeout=120,
            )
            sys.stdout.write(
                "\r{}\r".format(
                    " " * getattr(self.parser.options, "output_columns", PNUM)
                )
            )
            sys.stdout.write(
                " * {LIGHT_GREEN}Starting salt-master ... STARTED!\n{ENDC}".format(
                    **self.colors
                )
            )
            sys.stdout.flush()
        except (RuntimeWarning, RuntimeError):
            sys.stdout.write(
                "\r{}\r".format(
                    " " * getattr(self.parser.options, "output_columns", PNUM)
                )
            )
            sys.stdout.write(
                " * {LIGHT_RED}Starting salt-master ... FAILED!\n{ENDC}".format(
                    **self.colors
                )
            )
            sys.stdout.flush()
            raise TestDaemonStartFailed()

        try:
            sys.stdout.write(
                " * {LIGHT_YELLOW}Starting salt-minion ... {ENDC}".format(**self.colors)
            )
            sys.stdout.flush()
            self.minion_process = start_daemon(
                daemon_name="salt-minion",
                daemon_id=self.master_opts["id"],
                daemon_log_prefix="salt-minion/{}".format(self.minion_opts["id"]),
                daemon_cli_script_name="minion",
                daemon_config=self.minion_opts,
                daemon_config_dir=RUNTIME_VARS.TMP_CONF_DIR,
                daemon_class=SaltMinion,
                bin_dir_path=SCRIPT_DIR,
                fail_hard=True,
                event_listener_config_dir=RUNTIME_VARS.TMP_CONF_DIR,
                start_timeout=120,
            )
            sys.stdout.write(
                "\r{}\r".format(
                    " " * getattr(self.parser.options, "output_columns", PNUM)
                )
            )
            sys.stdout.write(
                " * {LIGHT_GREEN}Starting salt-minion ... STARTED!\n{ENDC}".format(
                    **self.colors
                )
            )
            sys.stdout.flush()
        except (RuntimeWarning, RuntimeError):
            sys.stdout.write(
                "\r{}\r".format(
                    " " * getattr(self.parser.options, "output_columns", PNUM)
                )
            )
            sys.stdout.write(
                " * {LIGHT_RED}Starting salt-minion ... FAILED!\n{ENDC}".format(
                    **self.colors
                )
            )
            sys.stdout.flush()
            raise TestDaemonStartFailed()

        try:
            sys.stdout.write(
                " * {LIGHT_YELLOW}Starting sub salt-minion ... {ENDC}".format(
                    **self.colors
                )
            )
            sys.stdout.flush()
            self.sub_minion_process = start_daemon(
                daemon_name="sub salt-minion",
                daemon_id=self.master_opts["id"],
                daemon_log_prefix="sub-salt-minion/{}".format(
                    self.sub_minion_opts["id"]
                ),
                daemon_cli_script_name="minion",
                daemon_config=self.sub_minion_opts,
                daemon_config_dir=RUNTIME_VARS.TMP_SUB_MINION_CONF_DIR,
                daemon_class=SaltMinion,
                bin_dir_path=SCRIPT_DIR,
                fail_hard=True,
                event_listener_config_dir=RUNTIME_VARS.TMP_CONF_DIR,
                start_timeout=120,
            )
            sys.stdout.write(
                "\r{}\r".format(
                    " " * getattr(self.parser.options, "output_columns", PNUM)
                )
            )
            sys.stdout.write(
                " * {LIGHT_GREEN}Starting sub salt-minion ... STARTED!\n{ENDC}".format(
                    **self.colors
                )
            )
            sys.stdout.flush()
        except (RuntimeWarning, RuntimeError):
            sys.stdout.write(
                "\r{}\r".format(
                    " " * getattr(self.parser.options, "output_columns", PNUM)
                )
            )
            sys.stdout.write(
                " * {LIGHT_RED}Starting sub salt-minion ... FAILED!\n{ENDC}".format(
                    **self.colors
                )
            )
            sys.stdout.flush()
            raise TestDaemonStartFailed()

        try:
            sys.stdout.write(
                " * {LIGHT_YELLOW}Starting syndic salt-master ... {ENDC}".format(
                    **self.colors
                )
            )
            sys.stdout.flush()
            self.prep_syndic()
            self.smaster_process = start_daemon(
                daemon_name="salt-smaster",
                daemon_id=self.syndic_master_opts["id"],
                daemon_log_prefix="salt-smaster/{}".format(
                    self.syndic_master_opts["id"]
                ),
                daemon_cli_script_name="master",
                daemon_config=self.syndic_master_opts,
                daemon_config_dir=RUNTIME_VARS.TMP_SYNDIC_MASTER_CONF_DIR,
                daemon_class=SaltMaster,
                bin_dir_path=SCRIPT_DIR,
                fail_hard=True,
                event_listener_config_dir=RUNTIME_VARS.TMP_SYNDIC_MASTER_CONF_DIR,
                start_timeout=120,
            )
            sys.stdout.write(
                "\r{}\r".format(
                    " " * getattr(self.parser.options, "output_columns", PNUM)
                )
            )
            sys.stdout.write(
                " * {LIGHT_GREEN}Starting syndic salt-master ... STARTED!\n{ENDC}".format(
                    **self.colors
                )
            )
            sys.stdout.flush()
        except (RuntimeWarning, RuntimeError):
            sys.stdout.write(
                "\r{}\r".format(
                    " " * getattr(self.parser.options, "output_columns", PNUM)
                )
            )
            sys.stdout.write(
                " * {LIGHT_RED}Starting syndic salt-master ... FAILED!\n{ENDC}".format(
                    **self.colors
                )
            )
            sys.stdout.flush()
            raise TestDaemonStartFailed()

        try:
            sys.stdout.write(
                " * {LIGHT_YELLOW}Starting salt-syndic ... {ENDC}".format(**self.colors)
            )
            sys.stdout.flush()
            self.syndic_process = start_daemon(
                daemon_name="salt-syndic",
                daemon_id=self.syndic_opts["id"],
                daemon_log_prefix="salt-syndic/{}".format(self.syndic_opts["id"]),
                daemon_cli_script_name="syndic",
                daemon_config=self.syndic_opts,
                daemon_config_dir=RUNTIME_VARS.TMP_SYNDIC_MINION_CONF_DIR,
                daemon_class=SaltSyndic,
                bin_dir_path=SCRIPT_DIR,
                fail_hard=True,
                event_listener_config_dir=RUNTIME_VARS.TMP_CONF_DIR,
                start_timeout=120,
            )
            sys.stdout.write(
                "\r{}\r".format(
                    " " * getattr(self.parser.options, "output_columns", PNUM)
                )
            )
            sys.stdout.write(
                " * {LIGHT_GREEN}Starting salt-syndic ... STARTED!\n{ENDC}".format(
                    **self.colors
                )
            )
            sys.stdout.flush()
        except (RuntimeWarning, RuntimeError):
            sys.stdout.write(
                "\r{}\r".format(
                    " " * getattr(self.parser.options, "output_columns", PNUM)
                )
            )
            sys.stdout.write(
                " * {LIGHT_RED}Starting salt-syndic ... FAILED!\n{ENDC}".format(
                    **self.colors
                )
            )
            sys.stdout.flush()
            raise TestDaemonStartFailed()

        if self.parser.options.proxy:
            self.minion_targets.add(self.proxy_opts["id"])
            try:
                sys.stdout.write(
                    " * {LIGHT_YELLOW}Starting salt-proxy ... {ENDC}".format(
                        **self.colors
                    )
                )
                sys.stdout.flush()
                self.proxy_process = start_daemon(
                    daemon_name="salt-proxy",
                    daemon_id=self.proxy_opts["id"],
                    daemon_log_prefix="salt-proxy/{}".format(self.proxy_opts["id"]),
                    daemon_cli_script_name="proxy",
                    daemon_config=self.proxy_opts,
                    daemon_config_dir=RUNTIME_VARS.TMP_CONF_DIR,
                    daemon_class=SaltProxy,
                    bin_dir_path=SCRIPT_DIR,
                    fail_hard=True,
                    start_timeout=120,
                )
                sys.stdout.write(
                    "\r{}\r".format(
                        " " * getattr(self.parser.options, "output_columns", PNUM)
                    )
                )
                sys.stdout.write(
                    " * {LIGHT_GREEN}Starting salt-proxy ... STARTED!\n{ENDC}".format(
                        **self.colors
                    )
                )
                sys.stdout.flush()
            except (RuntimeWarning, RuntimeError):
                sys.stdout.write(
                    "\r{}\r".format(
                        " " * getattr(self.parser.options, "output_columns", PNUM)
                    )
                )
                sys.stdout.write(
                    " * {LIGHT_RED}Starting salt-proxy ... FAILED!\n{ENDC}".format(
                        **self.colors
                    )
                )
                sys.stdout.flush()
                raise TestDaemonStartFailed()

    start_tcp_daemons = start_zeromq_daemons

    def prep_syndic(self):
        """
        Create a roster file for salt's syndic
        """
        roster_path = os.path.join(FILES, "conf/_ssh/roster")
        shutil.copy(roster_path, RUNTIME_VARS.TMP_CONF_DIR)
        shutil.copy(roster_path, RUNTIME_VARS.TMP_SYNDIC_MASTER_CONF_DIR)

    def prep_ssh(self):
        """
        Generate keys and start an ssh daemon on an alternate port
        """
        sys.stdout.write(
            " * {LIGHT_GREEN}Starting {0} ... {ENDC}".format(
                "SSH server", **self.colors
            )
        )
        keygen = salt.utils.path.which("ssh-keygen")
        sshd = salt.utils.path.which("sshd")

        if not (keygen and sshd):
            print(
                "WARNING: Could not initialize SSH subsystem. Tests for salt-ssh may break!"
            )
            return
        if not os.path.exists(RUNTIME_VARS.TMP_CONF_DIR):
            os.makedirs(RUNTIME_VARS.TMP_CONF_DIR)

        # Generate client key
        pub_key_test_file = os.path.join(RUNTIME_VARS.TMP_CONF_DIR, "key_test.pub")
        priv_key_test_file = os.path.join(RUNTIME_VARS.TMP_CONF_DIR, "key_test")
        if os.path.exists(pub_key_test_file):
            os.remove(pub_key_test_file)
        if os.path.exists(priv_key_test_file):
            os.remove(priv_key_test_file)
        keygen_process = subprocess.Popen(
            [
                keygen,
                "-t",
                "ecdsa",
                "-b",
                "521",
                "-C",
                '"$(whoami)@$(hostname)-$(date -I)"',
                "-f",
                "key_test",
                "-P",
                "",
            ],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            close_fds=True,
            cwd=RUNTIME_VARS.TMP_CONF_DIR,
        )
        _, keygen_err = keygen_process.communicate()
        if keygen_err:
            print(
                "ssh-keygen had errors: {}".format(
                    salt.utils.stringutils.to_str(keygen_err)
                )
            )
        sshd_config_path = os.path.join(FILES, "conf/_ssh/sshd_config")
        shutil.copy(sshd_config_path, RUNTIME_VARS.TMP_CONF_DIR)
        auth_key_file = os.path.join(RUNTIME_VARS.TMP_CONF_DIR, "key_test.pub")

        # Generate server key
        server_key_dir = os.path.join(RUNTIME_VARS.TMP_CONF_DIR, "server")
        if not os.path.exists(server_key_dir):
            os.makedirs(server_key_dir)
        server_dsa_priv_key_file = os.path.join(server_key_dir, "ssh_host_dsa_key")
        server_dsa_pub_key_file = os.path.join(server_key_dir, "ssh_host_dsa_key.pub")
        server_ecdsa_priv_key_file = os.path.join(server_key_dir, "ssh_host_ecdsa_key")
        server_ecdsa_pub_key_file = os.path.join(
            server_key_dir, "ssh_host_ecdsa_key.pub"
        )
        server_ed25519_priv_key_file = os.path.join(
            server_key_dir, "ssh_host_ed25519_key"
        )
        server_ed25519_pub_key_file = os.path.join(
            server_key_dir, "ssh_host.ed25519_key.pub"
        )

        for server_key_file in (
            server_dsa_priv_key_file,
            server_dsa_pub_key_file,
            server_ecdsa_priv_key_file,
            server_ecdsa_pub_key_file,
            server_ed25519_priv_key_file,
            server_ed25519_pub_key_file,
        ):
            if os.path.exists(server_key_file):
                os.remove(server_key_file)

        keygen_process_dsa = subprocess.Popen(
            [
                keygen,
                "-t",
                "dsa",
                "-b",
                "1024",
                "-C",
                '"$(whoami)@$(hostname)-$(date -I)"',
                "-f",
                "ssh_host_dsa_key",
                "-P",
                "",
            ],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            close_fds=True,
            cwd=server_key_dir,
        )
        _, keygen_dsa_err = keygen_process_dsa.communicate()
        if keygen_dsa_err:
            print(
                "ssh-keygen had errors: {}".format(
                    salt.utils.stringutils.to_str(keygen_dsa_err)
                )
            )

        keygen_process_ecdsa = subprocess.Popen(
            [
                keygen,
                "-t",
                "ecdsa",
                "-b",
                "521",
                "-C",
                '"$(whoami)@$(hostname)-$(date -I)"',
                "-f",
                "ssh_host_ecdsa_key",
                "-P",
                "",
            ],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            close_fds=True,
            cwd=server_key_dir,
        )
        _, keygen_escda_err = keygen_process_ecdsa.communicate()
        if keygen_escda_err:
            print(
                "ssh-keygen had errors: {}".format(
                    salt.utils.stringutils.to_str(keygen_escda_err)
                )
            )

        keygen_process_ed25519 = subprocess.Popen(
            [
                keygen,
                "-t",
                "ed25519",
                "-b",
                "521",
                "-C",
                '"$(whoami)@$(hostname)-$(date -I)"',
                "-f",
                "ssh_host_ed25519_key",
                "-P",
                "",
            ],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            close_fds=True,
            cwd=server_key_dir,
        )
        _, keygen_ed25519_err = keygen_process_ed25519.communicate()
        if keygen_ed25519_err:
            print(
                "ssh-keygen had errors: {}".format(
                    salt.utils.stringutils.to_str(keygen_ed25519_err)
                )
            )

        with salt.utils.files.fopen(
            os.path.join(RUNTIME_VARS.TMP_CONF_DIR, "sshd_config"), "a"
        ) as ssh_config:
            ssh_config.write("AuthorizedKeysFile {}\n".format(auth_key_file))
            if not keygen_dsa_err:
                ssh_config.write("HostKey {}\n".format(server_dsa_priv_key_file))
            if not keygen_escda_err:
                ssh_config.write("HostKey {}\n".format(server_ecdsa_priv_key_file))
            if not keygen_ed25519_err:
                ssh_config.write("HostKey {}\n".format(server_ed25519_priv_key_file))

        self.sshd_pidfile = os.path.join(RUNTIME_VARS.TMP_CONF_DIR, "sshd.pid")
        self.sshd_process = subprocess.Popen(
            [sshd, "-f", "sshd_config", "-o", "PidFile={}".format(self.sshd_pidfile)],
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            close_fds=True,
            cwd=RUNTIME_VARS.TMP_CONF_DIR,
        )
        _, sshd_err = self.sshd_process.communicate()
        if sshd_err:
            print(
                "sshd had errors on startup: {}".format(
                    salt.utils.stringutils.to_str(sshd_err)
                )
            )
        else:
            os.environ["SSH_DAEMON_RUNNING"] = "True"
        self.prep_syndic()
        with salt.utils.files.fopen(
            os.path.join(RUNTIME_VARS.TMP_CONF_DIR, "roster"), "a"
        ) as roster:
            roster.write("  user: {}\n".format(RUNTIME_VARS.RUNNING_TESTS_USER))
            roster.write(
                "  priv: {}/{}\n".format(RUNTIME_VARS.TMP_CONF_DIR, "key_test")
            )
            if salt.utils.platform.is_darwin():
                roster.write("  set_path: $PATH:/usr/local/bin/\n")
        sys.stdout.write(" {LIGHT_GREEN}STARTED!\n{ENDC}".format(**self.colors))

    @classmethod
    def config(cls, role):
        """
        Return a configuration for a master/minion/syndic.

        Currently these roles are:
            * master
            * minion
            * syndic
            * syndic_master
            * sub_minion
            * proxy
        """
        return RUNTIME_VARS.RUNTIME_CONFIGS[role]

    @classmethod
    def config_location(cls):
        return RUNTIME_VARS.TMP_CONF_DIR

    @property
    def client(self):
        """
        Return a local client which will be used for example to ping and sync
        the test minions.

        This client is defined as a class attribute because its creation needs
        to be deferred to a latter stage. If created it on `__enter__` like it
        previously was, it would not receive the master events.
        """
        if "runtime_client" not in RUNTIME_VARS.RUNTIME_CONFIGS:
            RUNTIME_VARS.RUNTIME_CONFIGS[
                "runtime_client"
            ] = salt.client.get_local_client(mopts=self.master_opts)
        return RUNTIME_VARS.RUNTIME_CONFIGS["runtime_client"]

    @classmethod
    def transplant_configs(cls, transport="zeromq"):
        if os.path.isdir(RUNTIME_VARS.TMP):
            shutil.rmtree(RUNTIME_VARS.TMP)
        os.makedirs(RUNTIME_VARS.TMP)
        os.makedirs(RUNTIME_VARS.TMP_ROOT_DIR)
        os.makedirs(RUNTIME_VARS.TMP_CONF_DIR)
        os.makedirs(RUNTIME_VARS.TMP_SUB_MINION_CONF_DIR)
        os.makedirs(RUNTIME_VARS.TMP_SYNDIC_MASTER_CONF_DIR)
        os.makedirs(RUNTIME_VARS.TMP_SYNDIC_MINION_CONF_DIR)

        print(
            " * Transplanting configuration files to '{}'".format(
                RUNTIME_VARS.TMP_CONF_DIR
            )
        )
        tests_known_hosts_file = os.path.join(
            RUNTIME_VARS.TMP_CONF_DIR, "salt_ssh_known_hosts"
        )
        with salt.utils.files.fopen(tests_known_hosts_file, "w") as known_hosts:
            known_hosts.write("")

        # This master connects to syndic_master via a syndic
        master_opts = salt.config._read_conf_file(
            os.path.join(RUNTIME_VARS.CONF_DIR, "master")
        )
        master_opts["known_hosts_file"] = tests_known_hosts_file
        master_opts["cachedir"] = "cache"
        master_opts["user"] = RUNTIME_VARS.RUNNING_TESTS_USER
        master_opts["root_dir"] = os.path.join(TMP_ROOT_DIR)
        master_opts["pki_dir"] = "pki"
        master_opts["syndic_master"] = "localhost"
        pytest_stop_sending_events_file = os.path.join(
            TMP_ROOT_DIR, "pytest_stop_sending_events_file_master"
        )
        with salt.utils.files.fopen(pytest_stop_sending_events_file, "w") as wfh:
            wfh.write("")
        master_opts["pytest_stop_sending_events_file"] = pytest_stop_sending_events_file
        file_tree = {
            "root_dir": os.path.join(FILES, "pillar", "base", "file_tree"),
            "follow_dir_links": False,
            "keep_newline": True,
        }
        master_opts["ext_pillar"].append({"file_tree": file_tree})

        # Config settings to test `event_return`
        if "returner_dirs" not in master_opts:
            master_opts["returner_dirs"] = []
        master_opts["returner_dirs"].append(
            os.path.join(RUNTIME_VARS.FILES, "returners")
        )
        master_opts["event_return"] = "runtests_noop"

        # Under windows we can't seem to properly create a virtualenv off of another
        # virtualenv, we can on linux but we will still point to the virtualenv binary
        # outside the virtualenv running the test suite, if that's the case.
        try:
            real_prefix = sys.real_prefix
            # The above attribute exists, this is a virtualenv
            if salt.utils.platform.is_windows():
                virtualenv_binary = os.path.join(
                    real_prefix, "Scripts", "virtualenv.exe"
                )
            else:
                # We need to remove the virtualenv from PATH or we'll get the virtualenv binary
                # from within the virtualenv, we don't want that
                path = os.environ.get("PATH")
                if path is not None:
                    path_items = path.split(os.pathsep)
                    for item in path_items[:]:
                        if item.startswith(sys.base_prefix):
                            path_items.remove(item)
                    os.environ["PATH"] = os.pathsep.join(path_items)
                virtualenv_binary = salt.utils.path.which("virtualenv")
                if path is not None:
                    # Restore previous environ PATH
                    os.environ["PATH"] = path
                if not virtualenv_binary.startswith(real_prefix):
                    virtualenv_binary = None
            if virtualenv_binary and not os.path.exists(virtualenv_binary):
                # It doesn't exist?!
                virtualenv_binary = None
        except AttributeError:
            # We're not running inside a virtualenv
            virtualenv_binary = None

        # This minion connects to master
        minion_opts = salt.config._read_conf_file(
            os.path.join(RUNTIME_VARS.CONF_DIR, "minion")
        )
        minion_opts["cachedir"] = "cache"
        minion_opts["user"] = RUNTIME_VARS.RUNNING_TESTS_USER
        minion_opts["root_dir"] = os.path.join(TMP_ROOT_DIR)
        minion_opts["pki_dir"] = "pki"
        minion_opts["hosts.file"] = os.path.join(TMP_ROOT_DIR, "hosts")
        minion_opts["aliases.file"] = os.path.join(TMP_ROOT_DIR, "aliases")
        if virtualenv_binary:
            minion_opts["venv_bin"] = virtualenv_binary

        # This sub_minion also connects to master
        sub_minion_opts = salt.config._read_conf_file(
            os.path.join(RUNTIME_VARS.CONF_DIR, "sub_minion")
        )
        sub_minion_opts["cachedir"] = "cache"
        sub_minion_opts["user"] = RUNTIME_VARS.RUNNING_TESTS_USER
        sub_minion_opts["root_dir"] = os.path.join(TMP, "rootdir-sub-minion")
        sub_minion_opts["pki_dir"] = "pki"
        sub_minion_opts["hosts.file"] = os.path.join(TMP_ROOT_DIR, "hosts")
        sub_minion_opts["aliases.file"] = os.path.join(TMP_ROOT_DIR, "aliases")
        if virtualenv_binary:
            sub_minion_opts["venv_bin"] = virtualenv_binary

        # This is the master of masters
        syndic_master_opts = salt.config._read_conf_file(
            os.path.join(RUNTIME_VARS.CONF_DIR, "syndic_master")
        )
        syndic_master_opts["cachedir"] = "cache"
        syndic_master_opts["user"] = RUNTIME_VARS.RUNNING_TESTS_USER
        syndic_master_opts["root_dir"] = os.path.join(TMP, "rootdir-syndic-master")
        syndic_master_opts["pki_dir"] = "pki"
        pytest_stop_sending_events_file = os.path.join(
            TMP_ROOT_DIR, "pytest_stop_sending_events_file_syndic_master"
        )
        with salt.utils.files.fopen(pytest_stop_sending_events_file, "w") as wfh:
            wfh.write("")
        syndic_master_opts[
            "pytest_stop_sending_events_file"
        ] = pytest_stop_sending_events_file

        # This is the syndic for master
        # Let's start with a copy of the syndic master configuration
        syndic_opts = copy.deepcopy(syndic_master_opts)
        # Let's update with the syndic configuration
        syndic_opts.update(
            salt.config._read_conf_file(os.path.join(RUNTIME_VARS.CONF_DIR, "syndic"))
        )
        syndic_opts["cachedir"] = "cache"
        syndic_opts["root_dir"] = os.path.join(TMP_ROOT_DIR)

        # This is the syndic for master
        # Let's start with a copy of the syndic master configuration
        syndic_opts = copy.deepcopy(syndic_master_opts)
        # Let's update with the syndic configuration
        syndic_opts.update(
            salt.config._read_conf_file(os.path.join(RUNTIME_VARS.CONF_DIR, "syndic"))
        )
        syndic_opts["config_dir"] = RUNTIME_VARS.TMP_SYNDIC_MINION_CONF_DIR
        syndic_opts["cachedir"] = os.path.join(TMP, "rootdir", "cache")
        syndic_opts["root_dir"] = os.path.join(TMP, "rootdir")

        # This proxy connects to master
        proxy_opts = salt.config._read_conf_file(os.path.join(CONF_DIR, "proxy"))
        proxy_opts["cachedir"] = "cache"
        # proxy_opts['user'] = running_tests_user
        proxy_opts["root_dir"] = os.path.join(TMP, "rootdir-proxy")
        proxy_opts["pki_dir"] = "pki"
        proxy_opts["hosts.file"] = os.path.join(TMP, "rootdir-proxy", "hosts")
        proxy_opts["aliases.file"] = os.path.join(TMP, "rootdir-proxy", "aliases")

        if transport == "tcp":
            master_opts["transport"] = "tcp"
            minion_opts["transport"] = "tcp"
            sub_minion_opts["transport"] = "tcp"
            syndic_master_opts["transport"] = "tcp"
            proxy_opts["transport"] = "tcp"

        # This is the syndic for master
        # Let's start with a copy of the syndic master configuration
        syndic_opts = copy.deepcopy(master_opts)
        # Let's update with the syndic configuration
        syndic_opts.update(
            salt.config._read_conf_file(os.path.join(RUNTIME_VARS.CONF_DIR, "syndic"))
        )
        syndic_opts["cachedir"] = os.path.join(TMP, "rootdir", "cache")
        syndic_opts["config_dir"] = RUNTIME_VARS.TMP_SYNDIC_MINION_CONF_DIR

        # Set up config options that require internal data
        master_opts["pillar_roots"] = syndic_master_opts["pillar_roots"] = {
            "base": [
                RUNTIME_VARS.TMP_PILLAR_TREE,
                os.path.join(FILES, "pillar", "base"),
            ]
        }
        minion_opts["pillar_roots"] = {
            "base": [
                RUNTIME_VARS.TMP_PILLAR_TREE,
                os.path.join(FILES, "pillar", "base"),
            ]
        }
        master_opts["file_roots"] = syndic_master_opts["file_roots"] = {
            "base": [
                # Let's support runtime created files that can be used like:
                #   salt://my-temp-file.txt
                RUNTIME_VARS.TMP_STATE_TREE,
                os.path.join(FILES, "file", "base"),
            ],
            # Alternate root to test __env__ choices
            "prod": [
                os.path.join(FILES, "file", "prod"),
                RUNTIME_VARS.TMP_PRODENV_STATE_TREE,
            ],
        }
        minion_opts["file_roots"] = {
            "base": [
                # Let's support runtime created files that can be used like:
                #   salt://my-temp-file.txt
                RUNTIME_VARS.TMP_STATE_TREE,
                os.path.join(FILES, "file", "base"),
            ],
            # Alternate root to test __env__ choices
            "prod": [
                os.path.join(FILES, "file", "prod"),
                RUNTIME_VARS.TMP_PRODENV_STATE_TREE,
            ],
        }
        master_opts.setdefault("reactor", []).append(
            {"salt/minion/*/start": [os.path.join(FILES, "reactor-sync-minion.sls")]}
        )
        for opts_dict in (master_opts, syndic_master_opts):
            if "ext_pillar" not in opts_dict:
                opts_dict["ext_pillar"] = []
            if salt.utils.platform.is_windows():
                opts_dict["ext_pillar"].append(
                    {"cmd_yaml": "type {}".format(os.path.join(FILES, "ext.yaml"))}
                )
            else:
                opts_dict["ext_pillar"].append(
                    {"cmd_yaml": "cat {}".format(os.path.join(FILES, "ext.yaml"))}
                )

        # all read, only owner write
        autosign_file_permissions = (
            stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH | stat.S_IWUSR
        )
        for opts_dict in (master_opts, syndic_master_opts):
            # We need to copy the extension modules into the new master root_dir or
            # it will be prefixed by it
            new_extension_modules_path = os.path.join(
                opts_dict["root_dir"], "extension_modules"
            )
            if not os.path.exists(new_extension_modules_path):
                shutil.copytree(
                    os.path.join(INTEGRATION_TEST_DIR, "files", "extension_modules"),
                    new_extension_modules_path,
                )
            opts_dict["extension_modules"] = os.path.join(
                opts_dict["root_dir"], "extension_modules"
            )

            # Copy the autosign_file to the new  master root_dir
            new_autosign_file_path = os.path.join(
                opts_dict["root_dir"], "autosign_file"
            )
            shutil.copyfile(
                os.path.join(INTEGRATION_TEST_DIR, "files", "autosign_file"),
                new_autosign_file_path,
            )
            os.chmod(new_autosign_file_path, autosign_file_permissions)

        # Point the config values to the correct temporary paths
        for name in ("hosts", "aliases"):
            optname = "{}.file".format(name)
            optname_path = os.path.join(TMP, name)
            master_opts[optname] = optname_path
            minion_opts[optname] = optname_path
            sub_minion_opts[optname] = optname_path
            syndic_opts[optname] = optname_path
            syndic_master_opts[optname] = optname_path
            proxy_opts[optname] = optname_path

        master_opts["runtests_conn_check_port"] = get_unused_localhost_port()
        minion_opts["runtests_conn_check_port"] = get_unused_localhost_port()
        sub_minion_opts["runtests_conn_check_port"] = get_unused_localhost_port()
        syndic_opts["runtests_conn_check_port"] = get_unused_localhost_port()
        syndic_master_opts["runtests_conn_check_port"] = get_unused_localhost_port()
        proxy_opts["runtests_conn_check_port"] = get_unused_localhost_port()

        for conf in (
            master_opts,
            minion_opts,
            sub_minion_opts,
            syndic_opts,
            syndic_master_opts,
            proxy_opts,
        ):
            if "engines" not in conf:
                conf["engines"] = []
            conf["engines"].append({"salt_runtests": {}})

            if "engines_dirs" not in conf:
                conf["engines_dirs"] = []

            conf["engines_dirs"].insert(0, ENGINES_DIR)

            if "log_handlers_dirs" not in conf:
                conf["log_handlers_dirs"] = []
            conf["log_handlers_dirs"].insert(0, LOG_HANDLERS_DIR)
            conf["runtests_log_port"] = SALT_LOG_PORT
            conf["runtests_log_level"] = (
                os.environ.get("TESTS_MIN_LOG_LEVEL_NAME") or "debug"
            )

        # ----- Transcribe Configuration ---------------------------------------------------------------------------->
        for entry in os.listdir(RUNTIME_VARS.CONF_DIR):
            if entry in (
                "master",
                "minion",
                "sub_minion",
                "syndic",
                "syndic_master",
                "proxy",
            ):
                # These have runtime computed values and will be handled
                # differently
                continue
            entry_path = os.path.join(RUNTIME_VARS.CONF_DIR, entry)
            if os.path.isfile(entry_path):
                shutil.copy(entry_path, os.path.join(RUNTIME_VARS.TMP_CONF_DIR, entry))
            elif os.path.isdir(entry_path):
                shutil.copytree(
                    entry_path, os.path.join(RUNTIME_VARS.TMP_CONF_DIR, entry)
                )

        for entry in (
            "master",
            "minion",
            "sub_minion",
            "syndic",
            "syndic_master",
            "proxy",
        ):
            computed_config = copy.deepcopy(locals()["{}_opts".format(entry)])
            with salt.utils.files.fopen(
                os.path.join(RUNTIME_VARS.TMP_CONF_DIR, entry), "w"
            ) as fp_:
                salt.utils.yaml.safe_dump(
                    computed_config, fp_, default_flow_style=False
                )
        sub_minion_computed_config = copy.deepcopy(sub_minion_opts)
        with salt.utils.files.fopen(
            os.path.join(RUNTIME_VARS.TMP_SUB_MINION_CONF_DIR, "minion"), "w"
        ) as wfh:
            salt.utils.yaml.safe_dump(
                sub_minion_computed_config, wfh, default_flow_style=False
            )
        shutil.copyfile(
            os.path.join(RUNTIME_VARS.TMP_CONF_DIR, "master"),
            os.path.join(RUNTIME_VARS.TMP_SUB_MINION_CONF_DIR, "master"),
        )

        syndic_master_computed_config = copy.deepcopy(syndic_master_opts)
        with salt.utils.files.fopen(
            os.path.join(RUNTIME_VARS.TMP_SYNDIC_MASTER_CONF_DIR, "master"), "w"
        ) as wfh:
            salt.utils.yaml.safe_dump(
                syndic_master_computed_config, wfh, default_flow_style=False
            )
        syndic_computed_config = copy.deepcopy(syndic_opts)
        with salt.utils.files.fopen(
            os.path.join(RUNTIME_VARS.TMP_SYNDIC_MINION_CONF_DIR, "minion"), "w"
        ) as wfh:
            salt.utils.yaml.safe_dump(
                syndic_computed_config, wfh, default_flow_style=False
            )
        shutil.copyfile(
            os.path.join(RUNTIME_VARS.TMP_CONF_DIR, "master"),
            os.path.join(RUNTIME_VARS.TMP_SYNDIC_MINION_CONF_DIR, "master"),
        )
        # <---- Transcribe Configuration -----------------------------------------------------------------------------

        # ----- Verify Environment ---------------------------------------------------------------------------------->
        master_opts = salt.config.master_config(
            os.path.join(RUNTIME_VARS.TMP_CONF_DIR, "master")
        )
        minion_opts = salt.config.minion_config(
            os.path.join(RUNTIME_VARS.TMP_CONF_DIR, "minion")
        )
        syndic_opts = salt.config.syndic_config(
            os.path.join(RUNTIME_VARS.TMP_SYNDIC_MINION_CONF_DIR, "master"),
            os.path.join(RUNTIME_VARS.TMP_SYNDIC_MINION_CONF_DIR, "minion"),
        )
        sub_minion_opts = salt.config.minion_config(
            os.path.join(RUNTIME_VARS.TMP_SUB_MINION_CONF_DIR, "minion")
        )
        syndic_master_opts = salt.config.master_config(
            os.path.join(RUNTIME_VARS.TMP_SYNDIC_MASTER_CONF_DIR, "master")
        )
        proxy_opts = salt.config.proxy_config(
            os.path.join(RUNTIME_VARS.TMP_CONF_DIR, "proxy")
        )

        RUNTIME_VARS.RUNTIME_CONFIGS["master"] = freeze(master_opts)
        RUNTIME_VARS.RUNTIME_CONFIGS["minion"] = freeze(minion_opts)
        RUNTIME_VARS.RUNTIME_CONFIGS["syndic"] = freeze(syndic_opts)
        RUNTIME_VARS.RUNTIME_CONFIGS["sub_minion"] = freeze(sub_minion_opts)
        RUNTIME_VARS.RUNTIME_CONFIGS["syndic_master"] = freeze(syndic_master_opts)
        RUNTIME_VARS.RUNTIME_CONFIGS["proxy"] = freeze(proxy_opts)

        verify_env(
            [
                os.path.join(master_opts["pki_dir"], "minions"),
                os.path.join(master_opts["pki_dir"], "minions_pre"),
                os.path.join(master_opts["pki_dir"], "minions_rejected"),
                os.path.join(master_opts["pki_dir"], "minions_denied"),
                os.path.join(master_opts["cachedir"], "jobs"),
                os.path.join(master_opts["root_dir"], "cache", "tokens"),
                os.path.join(syndic_master_opts["pki_dir"], "minions"),
                os.path.join(syndic_master_opts["pki_dir"], "minions_pre"),
                os.path.join(syndic_master_opts["pki_dir"], "minions_rejected"),
                os.path.join(syndic_master_opts["cachedir"], "jobs"),
                os.path.join(syndic_master_opts["root_dir"], "cache", "tokens"),
                os.path.join(master_opts["pki_dir"], "accepted"),
                os.path.join(master_opts["pki_dir"], "rejected"),
                os.path.join(master_opts["pki_dir"], "pending"),
                os.path.join(syndic_master_opts["pki_dir"], "accepted"),
                os.path.join(syndic_master_opts["pki_dir"], "rejected"),
                os.path.join(syndic_master_opts["pki_dir"], "pending"),
                os.path.join(minion_opts["pki_dir"], "accepted"),
                os.path.join(minion_opts["pki_dir"], "rejected"),
                os.path.join(minion_opts["pki_dir"], "pending"),
                os.path.join(sub_minion_opts["pki_dir"], "accepted"),
                os.path.join(sub_minion_opts["pki_dir"], "rejected"),
                os.path.join(sub_minion_opts["pki_dir"], "pending"),
                os.path.dirname(master_opts["log_file"]),
                minion_opts["extension_modules"],
                sub_minion_opts["extension_modules"],
                sub_minion_opts["pki_dir"],
                proxy_opts["pki_dir"],
                master_opts["sock_dir"],
                syndic_master_opts["sock_dir"],
                sub_minion_opts["sock_dir"],
                minion_opts["sock_dir"],
                RUNTIME_VARS.TMP_STATE_TREE,
                RUNTIME_VARS.TMP_PILLAR_TREE,
                RUNTIME_VARS.TMP_PRODENV_STATE_TREE,
                TMP,
            ],
            RUNTIME_VARS.RUNNING_TESTS_USER,
            root_dir=master_opts["root_dir"],
        )

        cls.master_opts = master_opts
        cls.minion_opts = minion_opts
        # cls.proxy_opts = proxy_opts
        cls.sub_minion_opts = sub_minion_opts
        cls.syndic_opts = syndic_opts
        cls.syndic_master_opts = syndic_master_opts
        cls.proxy_opts = proxy_opts
        # <---- Verify Environment -----------------------------------------------------------------------------------

    def __exit__(self, type, value, traceback):
        """
        Kill the minion and master processes
        """
        try:
            if hasattr(self.sub_minion_process, "terminate"):
                self.sub_minion_process.terminate()
            else:
                log.error("self.sub_minion_process can't be terminate.")
        except AttributeError:
            pass

        try:
            if hasattr(self.minion_process, "terminate"):
                self.minion_process.terminate()
            else:
                log.error("self.minion_process can't be terminate.")
        except AttributeError:
            pass

        if hasattr(self, "proxy_process"):
            self.proxy_process.terminate()

        try:
            if hasattr(self.master_process, "terminate"):
                self.master_process.terminate()
            else:
                log.error("self.master_process can't be terminate.")
        except AttributeError:
            pass

        try:
            self.syndic_process.terminate()
        except AttributeError:
            pass
        try:
            self.smaster_process.terminate()
        except AttributeError:
            pass
        self._exit_mockbin()
        self._exit_ssh()
        # Shutdown the multiprocessing logging queue listener
        salt_log_setup.shutdown_multiprocessing_logging()
        salt_log_setup.shutdown_multiprocessing_logging_listener(daemonizing=True)
        # Shutdown the log server
        self.log_server.shutdown()
        self.log_server.server_close()
        self.log_server_process.join()

    def pre_setup_minions(self):
        """
        Subclass this method for additional minion setups.
        """

    def setup_minions(self):
        """
        Minions setup routines
        """

    def post_setup_minions(self):
        """
        Subclass this method to execute code after the minions have been setup
        """

    def _enter_mockbin(self):
        path = os.environ.get("PATH", "")
        path_items = path.split(os.pathsep)
        if MOCKBIN not in path_items:
            path_items.insert(0, MOCKBIN)
        os.environ["PATH"] = os.pathsep.join(path_items)

    def _exit_ssh(self):
        if hasattr(self, "sshd_process"):
            try:
                self.sshd_process.kill()
            except OSError as exc:
                if exc.errno != 3:
                    raise
            if os.path.exists(self.sshd_pidfile):
                with salt.utils.files.fopen(self.sshd_pidfile) as fhr:
                    try:
                        os.kill(int(fhr.read()), signal.SIGKILL)
                    except OSError as exc:
                        if exc.errno != 3:
                            raise

    def _exit_mockbin(self):
        path = os.environ.get("PATH", "")
        path_items = path.split(os.pathsep)
        try:
            path_items.remove(MOCKBIN)
        except ValueError:
            pass
        os.environ["PATH"] = os.pathsep.join(path_items)

    @classmethod
    def clean(cls):
        """
        Clean out the tmp files
        """

        def remove_readonly(func, path, excinfo):
            if os.path.exists(path):
                # Give full permissions to owner
                os.chmod(path, stat.S_IRWXU)
                func(path)

        for dirname in (
            TMP,
            RUNTIME_VARS.TMP_STATE_TREE,
            RUNTIME_VARS.TMP_PILLAR_TREE,
            RUNTIME_VARS.TMP_PRODENV_STATE_TREE,
        ):
            if os.path.isdir(dirname):
                try:
                    shutil.rmtree(str(dirname), onerror=remove_readonly)
                except Exception:  # pylint: disable=broad-except
                    log.exception("Failed to remove directory: %s", dirname)

    def wait_for_jid(self, targets, jid, timeout=120):
        time.sleep(1)  # Allow some time for minions to accept jobs
        now = datetime.now()
        expire = now + timedelta(seconds=timeout)
        job_finished = False
        while now <= expire:
            running = self.__client_job_running(targets, jid)
            sys.stdout.write(
                "\r{}\r".format(
                    " " * getattr(self.parser.options, "output_columns", PNUM)
                )
            )
            if not running and job_finished is False:
                # Let's not have false positives and wait one more seconds
                job_finished = True
            elif not running and job_finished is True:
                return True
            elif running and job_finished is True:
                job_finished = False

            if job_finished is False:
                sys.stdout.write(
                    "   * {LIGHT_YELLOW}[Quit in {0}]{ENDC} Waiting for {1}".format(
                        "{}".format(expire - now).rsplit(".", 1)[0],
                        ", ".join(running),
                        **self.colors
                    )
                )
                sys.stdout.flush()
            time.sleep(1)
            now = datetime.now()
        else:  # pylint: disable=W0120
            sys.stdout.write(
                "\n {LIGHT_RED}*{ENDC} ERROR: Failed to get information "
                "back\n".format(**self.colors)
            )
            sys.stdout.flush()
        return False

    def __client_job_running(self, targets, jid):
        running = self.client.cmd(list(targets), "saltutil.running", tgt_type="list")
        return [k for (k, v) in running.items() if v and v[0]["jid"] == jid]

    def sync_minion_modules_(self, modules_kind, targets, timeout=None):
        if not timeout:
            timeout = 120
        # Let's sync all connected minions
        print(
            " {LIGHT_BLUE}*{ENDC} Syncing minion's {1} "
            "(saltutil.sync_{1})".format(
                ", ".join(targets), modules_kind, **self.colors
            )
        )
        syncing = set(targets)
        jid_info = self.client.run_job(
            list(targets),
            "saltutil.sync_{}".format(modules_kind),
            tgt_type="list",
            timeout=999999999999999,
        )

        if self.wait_for_jid(targets, jid_info["jid"], timeout) is False:
            print(
                " {LIGHT_RED}*{ENDC} WARNING: Minions failed to sync {0}. "
                "Tests requiring these {0} WILL fail".format(
                    modules_kind, **self.colors
                )
            )
            raise SystemExit()

        while syncing:
            rdata = self.client.get_full_returns(jid_info["jid"], syncing, 1)
            if rdata:
                for name, output in rdata.items():
                    if not output["ret"]:
                        # Already synced!?
                        syncing.remove(name)
                        continue

                    if isinstance(output["ret"], str):
                        # An errors has occurred
                        print(
                            " {LIGHT_RED}*{ENDC} {0} Failed to sync {2}: "
                            "{1}".format(
                                name, output["ret"], modules_kind, **self.colors
                            )
                        )
                        return False

                    print(
                        "   {LIGHT_GREEN}*{ENDC} Synced {0} {2}: "
                        "{1}".format(
                            name, ", ".join(output["ret"]), modules_kind, **self.colors
                        )
                    )
                    # Synced!
                    try:
                        syncing.remove(name)
                    except KeyError:
                        print(
                            " {LIGHT_RED}*{ENDC} {0} already synced??? "
                            "{1}".format(name, output, **self.colors)
                        )
        return True

    def sync_minion_states(self, targets, timeout=None):
        salt.utils.process.appendproctitle("SyncMinionStates")
        self.sync_minion_modules_("states", targets, timeout=timeout)

    def sync_minion_modules(self, targets, timeout=None):
        salt.utils.process.appendproctitle("SyncMinionModules")
        self.sync_minion_modules_("modules", targets, timeout=timeout)

    def sync_minion_grains(self, targets, timeout=None):
        salt.utils.process.appendproctitle("SyncMinionGrains")
        self.sync_minion_modules_("grains", targets, timeout=timeout)

    def wait_for_minions(self, start, timeout, sleep=5):
        """
        Ensure all minions and masters (including sub-masters) are connected.
        """
        while True:
            try:
                ret = self.client.run_job("*", "test.ping")
            except salt.exceptions.SaltClientError:
                ret = None
            if ret and "minions" not in ret:
                continue
            if ret and sorted(ret["minions"]) == sorted(self.minion_targets):
                break
            if time.time() - start >= timeout:
                raise RuntimeError("Ping Minions Failed")
            time.sleep(sleep)
